Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Allow pushing and pulling XCom from Task Context #45075

Merged
merged 1 commit into from
Dec 24, 2024

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Dec 19, 2024

Part of #44481

There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first.

Example DAG used: tutorial_dag

from __future__ import annotations

# [START tutorial]
# [START import_module]
import json
import textwrap

import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.providers.standard.operators.python import PythonOperator

# [END import_module]

# [START instantiate_dag]
with DAG(
    "tutorial_dag",
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    # [END default_args]
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    # [END instantiate_dag]
    # [START documentation]
    dag.doc_md = __doc__
    # [END documentation]

    # [START extract_function]
    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    # [END extract_function]

    # [START transform_function]
    def transform(**kwargs):
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)

    # [END transform_function]

    # [START load_function]
    def load(**kwargs):
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)

        print(total_order_value)

    # [END load_function]

    # [START main_flow]
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task

image image
image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil changed the title AIP-72: Allow pushing and pulling XCom from a Task AIP-72: Allow pushing and pulling XCom from Task Context Dec 19, 2024
@kaxil kaxil force-pushed the conn-context branch 4 times, most recently from ed34297 to 3a60948 Compare December 20, 2024 12:43
kaxil added a commit that referenced this pull request Dec 20, 2024
It fixes the following bug

```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
	{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
	{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	...
```

To reproduce just run `tutorial_dag` or the following minimal dag:

```python
import pendulum

from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator

with DAG(
    "sdk_tutorial_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__

    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

    extract_task
```

I need this fix for #45075 (part of the getting [Task Context working with AIP-72](#44481))
@kaxil kaxil force-pushed the conn-context branch 7 times, most recently from dc62eb8 to cc78190 Compare December 24, 2024 08:47
@kaxil kaxil marked this pull request as ready for review December 24, 2024 08:49
@kaxil kaxil requested review from ashb and amoghrajesh December 24, 2024 08:49
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits

task_sdk/src/airflow/sdk/api/client.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/api/client.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/execution_time/task_runner.py Outdated Show resolved Hide resolved
@kaxil kaxil merged commit 0917498 into apache:main Dec 24, 2024
50 checks passed
@kaxil kaxil deleted the conn-context branch December 24, 2024 11:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants